Load raster data via WMS¶

In [ ]:
import matplotlib.pyplot as plt
from datetime import datetime
import asyncio
from websockets import connect
import pyarrow as pa
import logging
import urllib.parse
import json
import xarray as xr
import numpy as np

import geoengine as ge
In [ ]:
logger = logging.getLogger('websockets')
# logger.setLevel(logging.DEBUG)
logger.setLevel(logging.ERROR)

Initialize Geo Engine¶

In [ ]:
ge.initialize("http://localhost:3030/api")
In [ ]:
session = ge.get_session()
session
Out[ ]:
Server:              http://localhost:3030/api
Session Id:          18fec623-6600-41af-b82b-24ccf47cb9f9

Define workflow of MODIS NDVI raster¶

In [ ]:
workflow = ge.register_workflow({
                "type": "Raster",
                "operator": {
                    "type": "GdalSource",
                    "params": {
                        "data": {
                            "type": "internal",
                            "datasetId": "36574dc3-560a-4b09-9d22-d5945f2b8093"
                        }
                    }
                }
            })
workflow
Out[ ]:
c7b7c5c8-ee11-5418-bd1f-9e5889f6e04a

Query raster via Websocket¶

In [ ]:
time = datetime.strptime('2014-04-01T12:00:00.000Z', "%Y-%m-%dT%H:%M:%S.%f%z")
bbox = ge.QueryRectangle(
    ge.BoundingBox2D(-180.0, -90.0, 180.0, 90.0),
    ge.TimeInterval(time, time),
    ge.SpatialResolution(0.1, 0.1),
)

# async def handler(websocket):
    # consumer_task = asyncio.create_task(consumer_handler(websocket))
    # producer_task = asyncio.create_task(producer_handler(websocket))
    # done, pending = await asyncio.wait(
    #     [consumer_task, producer_task],
    #     return_when=asyncio.FIRST_COMPLETED,
    # )
    # for task in pending:
    #     task.cancel()

async def client(uri):
    async with connect(
        uri,
        extra_headers=ge.get_session().auth_header,
    ) as websocket:
        # await handler(websocket)

        global_tile = None

        while websocket.open:
            try:
                await websocket.send("NEXT")
                arrow_ipc = await websocket.recv()
                record_batch = read_arrow_ipc(arrow_ipc)
                tile = create_xarray(record_batch)

                # print(tile)

                print("LOCAL")
                tile.plot()
                plt.show()

                print("GLOBAL")
                global_tile = merge_tiles(global_tile, tile)
                global_tile.plot()
                plt.show()
            except Exception as e:
                print("An error occurred:")
                print(e)

        if websocket.open:
            await websocket.close()

def read_arrow_ipc(arrow_ipc):
    reader = pa.ipc.open_file(arrow_ipc)
    record_batch = reader.get_record_batch(0)
    return record_batch

def create_xarray(record_batch):
    metadata = record_batch.schema.metadata
    spatial_partition: ge.api.SpatialPartition2D = json.loads(metadata[b'spatialPartition'])
    x_size = int(metadata[b'xSize'])
    y_size = int(metadata[b'ySize'])
    arrow_array = record_batch.column(0)

    xmin = spatial_partition['upperLeftCoordinate']['x']
    xmax = spatial_partition['lowerRightCoordinate']['x']
    ymin = spatial_partition['lowerRightCoordinate']['y']
    ymax = spatial_partition['upperLeftCoordinate']['y']

    return xr.DataArray(
        arrow_array.to_numpy(zero_copy_only=False).reshape(x_size, y_size),
        dims=["y", "x"],
        coords={
            'x': np.linspace(xmin, xmax, x_size, endpoint=False),
            'y': np.linspace(ymax, ymin, y_size, endpoint=False),
            'time': datetime.strptime('2014-04-01T12:00:00.000Z', "%Y-%m-%dT%H:%M:%S.%f%z"),
        },
    )

def merge_tiles(global_tile, tile):
    if global_tile is None:
        return tile

    try:
        # can be used as long as the coordinates are not yet in the dataset (e.g., as NaN)
        return xr.combine_by_coords(
            [global_tile, tile],
        )
    except:
        # can be used to fill in the coordinates that were previously NaN
        return global_tile.combine_first(tile)

ws_server_url = ge.get_session().server_url.replace("http", "ws")
url = f"{ws_server_url}/workflow/{workflow}/rasterStream?resultType=arrow&spatialBounds={bbox.bbox_str}&timeInterval={urllib.parse.quote(bbox.time_str)}&spatialResolution={bbox.spatial_resolution}"
print(url)
await client(url)
ws://localhost:3030/api/workflow/c7b7c5c8-ee11-5418-bd1f-9e5889f6e04a/rasterStream?resultType=arrow&spatialBounds=-180.0,-90.0,180.0,90.0&timeInterval=2014-04-01T12%3A00%3A00.000%2B00%3A00&spatialResolution=0.1,0.1
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
LOCAL
GLOBAL
An error occurred:
received 1000 (OK); then sent 1000 (OK)